Skip to content

Conversation

rkhachatryan
Copy link
Contributor

@rkhachatryan rkhachatryan commented Sep 30, 2025

This PR adds a new version of SinkUpsertMaterializer based on Sequenced MultiSet state introduced in #27071.

The existing operator is not changed.

Test coverage is provided by parameterized the tests extended in #27068.

@flinkbot
Copy link
Collaborator

flinkbot commented Sep 30, 2025

CI report:

Bot commands The @flinkbot bot supports the following commands:
  • @flinkbot run azure re-run the last Azure build

@rkhachatryan rkhachatryan changed the title [DRAFT][FLINK-38461] Introduce SinkUpsertMaterializerV2 [FLINK-38461] Introduce SinkUpsertMaterializerV2 Oct 15, 2025
@rkhachatryan rkhachatryan marked this pull request as ready for review October 15, 2025 07:47
@1996fanrui 1996fanrui self-assigned this Oct 15, 2025
Copy link
Member

@1996fanrui 1996fanrui left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @rkhachatryan for the pr!

Overall LGTM, I left some comments, please take a look when you are available, thanks

TABLE_EXEC_SINK_UPSERT_MATERIALIZE_STRATEGY =
key("table.exec.sink.upsert-materialize.strategy")
.enumType(SinkUpsertMaterializeStrategy.class)
.noDefaultValue()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The default is LEGACY, why noDefaultValue() here?

Comment on lines +171 to +186
"When using strategy=ADAPTIVE, defines the number of entries per key when the implementation is changed from MAP to VALUE. If not specified, Flink uses state-backend specific defaults.")
.linebreak()
.text("The option takes effect during job (re)starting")
.linebreak()
.build());

@Documentation.TableOption(execMode = Documentation.ExecMode.STREAMING)
public static final ConfigOption<Long>
TABLE_EXEC_SINK_UPSERT_MATERIALIZE_ADAPTIVE_THRESHOLD_HIGH =
key("table.exec.sink.upsert-materialize.adaptive.threshold.high")
.longType()
.noDefaultValue()
.withDescription(
Description.builder()
.text(
"When using strategy=ADAPTIVE, defines the number of entries per key when the implementation is changed from VALUE to MAP. If not specified, Flink uses state-backend specific defaults.")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would you mind mentioning the specific default value for each state backend?

.text(
"When using strategy=ADAPTIVE, defines the number of entries per key when the implementation is changed from VALUE to MAP. If not specified, Flink uses state-backend specific defaults.")
.linebreak()
.text("The option takes effect during job (re)starting")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I do not understand why mentioning "The option takes effect during job (re)starting". As I understand, almost all of config options take effect after job (re)starting.

Do you wanna point out that these 2 config options cannot be changed via dynamic conf?

+ Arrays.toString(indexMapping)
+ ", mutableRow="
+ row
+ '}';
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: how about adding isNullAtNonProjected into toString() as well?

import org.apache.flink.table.planner.plan.utils.{AggregateUtil, OverAggregateUtil, SortUtil}
import org.apache.flink.table.planner.plan.utils.PythonUtil.isPythonAggregate
import org.apache.flink.table.planner.utils.ShortcutUtils
import org.apache.flink.table.typeutils.RowTypeUtils
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This file does not change any code, why import is needed? by mistake or do I miss something?

upsertMaterialize,
// persist upsertMaterialize strategy separately in the compiled plan to make it immutable;
// later on, it can't be obtained from the node config because it is merged with the new environment
config.getOptional(TABLE_EXEC_SINK_UPSERT_MATERIALIZE_STRATEGY).orElse(null),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why it can be null?

Comment on lines +378 to +383
config.getOptional(
TABLE_EXEC_SINK_UPSERT_MATERIALIZE_ADAPTIVE_THRESHOLD_HIGH)
.orElse(null),
config.getOptional(
TABLE_EXEC_SINK_UPSERT_MATERIALIZE_ADAPTIVE_THRESHOLD_LOW)
.orElse(null),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
config.getOptional(
TABLE_EXEC_SINK_UPSERT_MATERIALIZE_ADAPTIVE_THRESHOLD_HIGH)
.orElse(null),
config.getOptional(
TABLE_EXEC_SINK_UPSERT_MATERIALIZE_ADAPTIVE_THRESHOLD_LOW)
.orElse(null),
config.get(TABLE_EXEC_SINK_UPSERT_MATERIALIZE_ADAPTIVE_THRESHOLD_HIGH),
config.get(TABLE_EXEC_SINK_UPSERT_MATERIALIZE_ADAPTIVE_THRESHOLD_LOW),

It they are not set, and no default value, get will return null directly.

: HashCodeGenerator.generateRowHash(
new CodeGeneratorContext(config, classLoader),
RowTypeUtils.projectRowType(physicalRowType, inputUpsertKey),
"",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how about setting a name here?

Comment on lines +415 to +433
TTL_CONFIG, RowType.of(types), EQUALISER, UPSERT_KEY_EQUALISER, upsertKey);
case MAP:
return createV2(
types,
upsertKey,
SequencedMultiSetStateConfig.forMap(
PROCESSING_TIME, StateTtlConfig.DISABLED));
case VALUE:
return createV2(
types,
upsertKey,
SequencedMultiSetStateConfig.forValue(
PROCESSING_TIME, StateTtlConfig.DISABLED));
case ADAPTIVE:
return createV2(
types,
upsertKey,
SequencedMultiSetStateConfig.adaptive(
PROCESSING_TIME, 10L, 5L, StateTtlConfig.DISABLED));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how about extracting a method to create StateTtlConfig via isTtlSupported() method?

It is helpful for keep the code consistency, only change isTtlSupported() in the future while supporting TTL.

Comment on lines +170 to +173
? ProjectedRowData.from(inputUpsertKey)
.withNullAtNonProjected(true)
.replaceRow(row)
: row,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have 2 comments about this part since it is a hot code path:

  1. withNullAtNonProjected will create a new ProjectedRowData object, so it created 2 ProjectedRowData objects for each row.
  2. ProjectedRowData is created on open in SinkUpsertMaterializer[1], and then it is reused by all rows. So how about create ProjectedRowData only once as well here.

[1]

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants